---
title: "Parallel (Map-Reduce)"
description: "Fan-out/fan-in workflows that let multiple specialists work in parallel"
icon: arrows-split-up-and-left
---

![Parallel workflow diagram](/images/parallel-workflow.png)

## When to use it

- You need several specialists to analyse the same request from different angles (e.g. proofread, fact-check, style review).
- You want deterministic aggregation of multiple perspectives into a single response.
- You have functions or agents that can run concurrently and do not depend on each other’s intermediate state.
- You want a cheap way to blend deterministic helpers (regex, heuristics) with heavyweight LLM agents.

Common scenarios include content review with multiple rubrics, multi-source research where each worker hits a different MCP server, or evaluation workflows where you want a “best effort” vote across diverse models.

## How it works

`create_parallel_llm(...)` returns a [`ParallelLLM`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/parallel/parallel_llm.py) composed of two primitives:

- **FanOut** ([`fan_out.py`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/parallel/fan_out.py)) launches every worker concurrently. Workers can be `AgentSpec`, pre-built `Agent`/`AugmentedLLM`, or plain callables via `fan_out_functions`.
- **FanIn** ([`fan_in.py`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/parallel/fan_in.py)) collects the responses and hands a [`FanInInput`](https://github.com/lastmile-ai/mcp-agent/blob/main/src/mcp_agent/workflows/parallel/fan_in.py#L18) structure to your aggregator. The aggregator can also be an `AgentSpec`, an `AugmentedLLM`, or a deterministic function.

FanOut returns a dictionary keyed by worker name, so the aggregator always knows who produced which output. When the aggregator is an LLM, mcp-agent automatically enters the worker contexts, attaches LLMs, and tracks token usage per branch.

## Quick start

```python
from mcp_agent.app import MCPApp
from mcp_agent.workflows.factory import (
    AgentSpec,
    RequestParams,
    create_parallel_llm,
)

app = MCPApp(name="parallel_example")

async def main():
    async with app.run() as running_app:
        parallel = create_parallel_llm(
            name="content_review_parallel",
            fan_in=AgentSpec(
                name="grader",
                instruction=(
                    "Blend the reviewer feedback into a single report. "
                    "Call out disagreements and provide an overall score."
                ),
            ),
            fan_out=[
                AgentSpec(
                    name="proofreader",
                    instruction="Check grammar, spelling, and clarity.",
                ),
                AgentSpec(
                    name="fact_checker",
                    instruction="Verify factual claims using the fetch server.",
                    server_names=["fetch"],
                ),
                AgentSpec(
                    name="style_enforcer",
                    instruction="Compare against the company voice and style guide.",
                ),
            ],
            fan_out_functions=[
                lambda prompt: [
                    {
                        "title": "metadata",
                        "details": {
                            "characters": len(prompt),
                            "keywords": prompt.split()[:5],
                        },
                    }
                ]
            ],
            provider="openai",
            request_params=RequestParams(maxTokens=2000, temperature=0.2),
            context=running_app.context,
        )

        result = await parallel.generate_str("Review this customer email draft.")
        return result
```

`create_parallel_llm` accepts `AgentSpec`, already-instantiated `Agent`/`AugmentedLLM`, or plain Python callables. Every worker receives the same prompt; the aggregator receives a structured summary of all responses and returns either a list of `CreateMessageResult` objects or a single string, depending on whether you call `generate` or `generate_str`.

### Working with the aggregator input

FanIn accepts several shapes (dicts or lists). For example, you can use a deterministic aggregator:

```python
from mcp_agent.workflows.parallel.fan_in import FanInInput

def aggregate_as_markdown(messages: FanInInput) -> str:
    blocks = []
    for source, outputs in messages.items():
        lines = "\n".join(str(item) for item in outputs)
        blocks.append(f"### {source}\n{lines}")
    return "\n\n".join(blocks)

parallel = create_parallel_llm(
    fan_in=aggregate_as_markdown,
    fan_out=[AgentSpec(name="qa", instruction="Answer with citations.")],
    context=running_app.context,
)
```

When the aggregator is an agent/LLM, mcp-agent wraps the aggregated string in a system prompt that names each contributor, making it easy to ask for weighted votes, majority decisions, or summarised findings.

## Operational tips

- **Throttle concurrency** by setting `executor.max_concurrent_activities` in `mcp_agent.config.yaml`. The parallel workflow uses the shared executor, so the limit applies across your entire app.
- **Mix deterministic helpers** via `fan_out_functions` for cheap signals (regex extractors, heuristics) alongside heavyweight LLM calls.
- **Inspect token/latency costs** with `await parallel.get_token_node()`—each fan-out worker appears as a child node, making it easy to spot the expensive branch.
- **Handle stragglers** by adjusting `RequestParams.timeoutSeconds` or adding retry logic inside the worker agents.
- **Reuse workers** by instantiating AugmentedLLMs once and passing them directly into `fan_out` to avoid repeated attachment overhead.

## Example projects

- [workflow_parallel](https://github.com/lastmile-ai/mcp-agent/tree/main/examples/workflows/workflow_parallel) – proofreader/fact-checker/style-enforcer with graded aggregation.
- [Temporal parallel workflow](https://github.com/lastmile-ai/mcp-agent/tree/main/examples/temporal) – demonstrates durable fan-out execution using Temporal as the engine.

## Related reading

- [Workflow & decorators guide](/mcp-agent-sdk/core-components/workflows)
- [Agents and AugmentedLLMs](/mcp-agent-sdk/core-components/agents)
